本文为您介绍如何编写和使用UDF。
背景信息
自2.2.0版本起,StarRocks支持使用Java语言编写用户定义函数(User Defined Function,简称UDF)。
自3.0版本起,StarRocks支持Global UDF,您只需要在相关的SQL语句(CREATE/SHOW/DROP)中加上GLOBAL
关键字,该语句即可全局生效,无需逐个为每个数据库执行此语句。您可以根据业务场景开发自定义函数,扩展StarRocks的函数能力。
目前StarRocks支持的UDF包括:
用户自定义标量函数(Scalar UDF)
用户自定义聚合函数(User Defined Aggregation Function,UDAF)
用户自定义窗口函数(User Defined Window Function,UDWF)
用户自定义表格函数(User Defined Table Function,UDTF)
前提条件
使用StarRocks的Java UDF功能前,您需要:
安装Apache Maven以创建并编写相关Java项目。
在服务器上安装JDK 1.8。
开启UDF功能。在实例配置页面,设置FE配置项
enable_udf
为TRUE
,并重启实例使配置项生效。
类型映射关系
SQL TYPE | Java TYPE |
BOOLEAN | java.lang.Boolean |
TINYINT | java.lang.Byte |
SMALLINT | java.lang.Short |
INT | java.lang.Integer |
BIGINT | java.lang.Long |
FLOAT | java.lang.Float |
DOUBLE | java.lang.Double |
STRING/VARCHAR | java.lang.String |
开发并使用UDF
您需要创建Maven项目并使用Java语言编写相应功能。
步骤一:创建Maven项目
创建Maven项目,项目的基本目录结构如下。
project
|--pom.xml
|--src
| |--main
| | |--java
| | |--resources
| |--test
|--target
步骤二:添加依赖
在pom.xml中添加如下依赖。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>udf</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.10</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
步骤三:开发UDF
您需要使用Java语言开发相应UDF。
开发Scalar UDF
Scalar UDF,即用户自定义标量函数,可以对单行数据进行操作,输出单行结果。当您在查询时使用Scalar UDF,每行数据最终都会按行出现在结果集中。典型的标量函数包括UPPER
、LOWER
、ROUND
、ABS
。
以下示例以提取JSON数据功能为例进行说明。例如,业务场景中,JSON数据中某个字段的值可能是JSON字符串而不是JSON对象,因此在提取JSON字符串时,SQL语句需要嵌套调用GET_JSON_STRING
,即GET_JSON_STRING(GET_JSON_STRING('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key"), "$.k0")
。
为简化SQL语句,您可以开发一个UDF,直接提取JSON字符串,例如:MY_UDF_JSON_GET('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key.k0")
。
package com.starrocks.udf.sample;
import com.alibaba.fastjson.JSONPath;
public class UDFJsonGet {
public final String evaluate(String jsonObj, String key) {
if (obj == null || key == null) return null;
try {
// JSONPath库可以全部展开,即使某个字段的值是JSON格式的字符串
return JSONPath.read(jsonObj, key).toString();
} catch (Exception e) {
return null;
}
}
}
用户自定义类必须实现如下方法。
方法中请求参数和返回参数的数据类型,需要和步骤六中的CREATE FUNCTION
语句中声明的相同,且两者的类型映射关系需要符合类型映射关系。
方法 | 含义 |
TYPE1 evaluate(TYPE2, ...) |
|
开发UDAF
UDAF,即用户自定义的聚合函数,对多行数据进行操作,输出单行结果。典型的聚合函数包括SUM
、COUNT
、MAX
、MIN
,这些函数对于每个GROUP BY分组中多行数据进行聚合后,只输出一行结果。
以下示例以MY_SUM_INT
函数为例进行说明。与内置函数SUM
(返回值为BIGINT类型)区别在于,MY_SUM_INT
函数支持传入参数和返回参数的类型为INT。
package com.starrocks.udf.sample;
public class SumInt {
public static class State {
int counter = 0;
public int serializeLength() { return 4; }
}
public State create() {
return new State();
}
public void destroy(State state) {
}
public final void update(State state, Integer val) {
if (val != null) {
state.counter+= val;
}
}
public void serialize(State state, java.nio.ByteBuffer buff) {
buff.putInt(state.counter);
}
public void merge(State state, java.nio.ByteBuffer buffer) {
int val = buffer.getInt();
state.counter += val;
}
public Integer finalize(State state) {
return state.counter;
}
}
用户自定义类必须实现如下方法。
方法中传入参数和返回参数的数据类型,需要和步骤六中的CREATE FUNCTION
语句中声明的相同,且两者的类型映射关系需要符合类型映射关系。
方法 | 含义 |
State create() | 创建State。 |
void destroy(State) | 销毁State。 |
void update(State, ...) | 更新State。其中第一个参数是State,其余的参数是函数声明的输入参数,可以为1个或多个。 |
void serialize(State, ByteBuffer) | 序列化State。 |
void merge(State, ByteBuffer) | 合并State和反序列化State。 |
TYPE finalize(State) | 通过State获取函数的最终结果。 |
并且,开发UDAF函数时,您需要使用缓冲区类java.nio.ByteBuffer
和局部变量serializeLength
,用于保存和表示中间结果,指定中间结果的序列化长度。
类和局部变量 | 说明 |
java.nio.ByteBuffer() | 缓冲区类,用于保存中间结果。由于中间结果在不同执行节点间传输时,会进行序列化和反序列化,因此还需要使用serializeLength指定中间结果序列化后的长度。 |
serializeLength() | 中间结果序列化后的长度,单位为Byte。serializeLength的数据类型固定为INT。例如,示例中 |
java.nio.ByteBuffer
序列化相关事项:
不支持依赖ByteBuffer的remaining()方法来反序列化State。
不支持对ByteBuffer调用clear()方法。
serializeLength
需要与实际写入数据的长度保持一致,否则序列化和反序列化过程中会造成结果错误。
开发UDWF
UDWF,即用户自定义窗口函数。跟普通聚合函数不同的是,窗口函数针对一组行(一个窗口)计算值,并为每行返回一个结果。一般情况下,窗口函数包含OVER
子句,将数据行拆分成多个分组,窗口函数基于每一行数据所在的组(一个窗口)进行计算,并为每行返回一个结果。
以下示例以MY_WINDOW_SUM_INT
函数为例进行说明。与内置函数SUM
(返回类型为BIGINT)区别在于,MY_WINDOW_SUM_INT
函数支持传入参数和返回参数的类型为INT。
package com.starrocks.udf.sample;
public class WindowSumInt {
public static class State {
int counter = 0;
public int serializeLength() { return 4; }
@Override
public String toString() {
return "State{" +
"counter=" + counter +
'}';
}
}
public State create() {
return new State();
}
public void destroy(State state) {
}
public void update(State state, Integer val) {
if (val != null) {
state.counter+=val;
}
}
public void serialize(State state, java.nio.ByteBuffer buff) {
buff.putInt(state.counter);
}
public void merge(State state, java.nio.ByteBuffer buffer) {
int val = buffer.getInt();
state.counter += val;
}
public Integer finalize(State state) {
return state.counter;
}
public void reset(State state) {
state.counter = 0;
}
public void windowUpdate(State state,
int peer_group_start, int peer_group_end,
int frame_start, int frame_end,
Integer[] inputs) {
for (int i = (int)frame_start; i < (int)frame_end; ++i) {
state.counter += inputs[i];
}
}
}
用户自定义类必须实现UDAF所需要的方法(窗口函数是特殊聚合函数)以及windowUpdate()方法。
方法中请求参数和返回参数的数据类型,需要和步骤六中的CREATE FUNCTION
语句中声明的相同,且两者的类型映射关系需要符合类型映射关系。
需要额外实现的方法
方法 | 含义 |
| 更新窗口数据。窗口函数的详细说明,请参见窗口函数。输入每一行数据,都会获取到对应窗口信息来更新中间结果。
|
开发UDTF
UDTF,即用户自定义表值函数,读入一行数据,输出多个值可视为一张表。表值函数常用于实现行转列。
目前UDTF只支持返回多行单列。
以下示例以MY_UDF_SPLIT
函数为例进行说明。MY_UDF_SPLIT
函数支持分隔符为空格,传入参数和返回参数的类型为STRING。
package com.starrocks.udf.sample;
public class UDFSplit{
public String[] process(String in) {
if (in == null) return null;
return in.split(" ");
}
}
用户自定义类必须实现如下方法。
方法中请求参数和返回参数的数据类型,需要和步骤六中的CREATE FUNCTION
语句中声明的相同,且两者的类型映射关系需要符合类型映射关系。
方法 | 含义 |
TYPE[] process() |
|
步骤四:打包Java项目
通过以下命令打包Java项目。
mvn package
target目录下会生成两个文件:udf-1.0-SNAPSHOT.jar
和udf-1.0-SNAPSHOT-jar-with-dependencies.jar
。
步骤五:上传项目
将文件udf-1.0-SNAPSHOT-jar-with-dependencies.jar
上传到OSS上,并开放JAR包的公共读权限。详情请参见简单上传、设置Bucket ACL。
步骤六中,FE会对UDF所在JAR包进行校验并计算校验值,BE会下载UDF所在JAR包并执行。
步骤六:在StarRocks中创建UDF
StarRocks内提供了两种Namespace的UDF:一种是Database级Namespace,一种是Global级Namespace。
如果您没有特殊的UDF可见性隔离需求,您可以直接选择创建Global UDF。在引用Global UDF时,直接调用Function Name即可,无需任何Catalog和Database作为前缀,访问更加便捷。
如果您有特殊的UDF可见性隔离需求,或者需要在不同Database下创建同名UDF,那么你可以选择在Database内创建UDF。此时,如果您的会话在某个Database内,您可以直接调用Function Name即可;如果您的会话在其他Catalog和Database下,那么您需要带上Catalog和Database前缀,例如:
catalog.database.function
。
创建Global UDF需要有System级的CREATE GLOBAL FUNCTION权限;创建数据库级别的UDF需要有数据库级的CREATE FUNCTION权限;使用UDF需要有对应UDF的USAGE权限。关于如何赋权,参见GRANT。
JAR包上传完成后,您需要在StarRocks中,按需创建相应的UDF。如果创建Global UDF,只需要在SQL语句中带上GLOBAL
关键字即可。
语法
CREATE [GLOBAL][AGGREGATE | TABLE] FUNCTION function_name(arg_type [, ...])
RETURNS return_type
[PROPERTIES ("key" = "value" [, ...]) ]
参数说明
参数 | 必选 | 说明 |
GLOBAL | 否 | 如需创建全局UDF,需指定该关键字。从3.0版本开始支持。 |
AGGREGATE | 否 | 如要创建UDAF和UDWF,需指定该关键字。 |
TABLE | 否 | 如要创建UDTF,需指定该关键字。 |
function_name | 是 | 函数名,可以包含数据库名称,比如, |
arg_type | 是 | 函数的参数类型。具体支持的数据类型,请参见类型映射关系。 |
return_type | 是 | 函数的返回值类型。具体支持的数据类型,请参见类型映射关系。 |
properties | 是 | 函数相关属性。创建不同类型的UDF需配置不同的属性,详情和示例请参考以下示例。 |
创建Scalar UDF
执行如下命令,在StarRocks中创建之前示例中的Scalar UDF。
CREATE [GLOBAL] FUNCTION MY_UDF_JSON_GET(string, string)
RETURNS string
PROPERTIES (
"symbol" = "com.starrocks.udf.sample.UDFJsonGet",
"type" = "StarrocksJar",
"file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);
参数 | 描述 |
symbol | UDF所在项目的类名。格式为 |
type | 用于标记所创建的UDF类型。取值为 |
file | UDF所在JAR包的HTTP路径,配置成OSS包含对应内网Endpoint的HTTP URL。格式为 |
创建UDAF
执行如下命令,在StarRocks中创建之前示例中的UDAF。
CREATE [GLOBAL] AGGREGATE FUNCTION MY_SUM_INT(INT)
RETURNS INT
PROPERTIES
(
"symbol" = "com.starrocks.udf.sample.SumInt",
"type" = "StarrocksJar",
"file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);
PROPERTIES
里的参数说明与创建Scalar UDF相同。
创建UDWF
执行如下命令,在StarRocks中创建先前示例中的UDWF。
CREATE [GLOBAL] AGGREGATE FUNCTION MY_WINDOW_SUM_INT(Int)
RETURNS Int
PROPERTIES
(
"analytic" = "true",
"symbol" = "com.starrocks.udf.sample.WindowSumInt",
"type" = "StarrocksJar",
"file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);
analytic
:所创建的函数是否为窗口函数,固定取值为true
。其他参数说明与创建Scalar UDF相同。
创建UDTF
执行如下命令,在StarRocks中创建先前示例中的UDTF。
CREATE [GLOBAL] TABLE FUNCTION MY_UDF_SPLIT(string)
RETURNS string
PROPERTIES
(
"symbol" = "com.starrocks.udf.sample.UDFSplit",
"type" = "StarrocksJar",
"file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);
PROPERTIES
里的参数说明与创建Scalar UDF相同。
步骤七:使用UDF
创建完成后,您可以测试使用您开发的UDF。
使用Scalar UDF
执行如下命令,使用步骤六创建的Scalar UDF函数。
SELECT MY_UDF_JSON_GET('{"key":"{\\"in\\":2}"}', '$.key.in');
使用UDAF
执行如下命令,使用步骤六创建的UDAF函数。
SELECT MY_SUM_INT(col1);
使用UDWF
执行如下命令,使用步骤六创建的UDWF函数。
SELECT MY_WINDOW_SUM_INT(intcol)
OVER (PARTITION BY intcol2
ORDER BY intcol3
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
FROM test_basic;
使用UDTF
执行如下命令,使用先前示例中的UDTF。
-- 假设存在表 t1,其列 a、b、c1 信息如下。
SELECT t1.a,t1.b,t1.c1 FROM t1;
> output:
1,2.1,"hello world"
2,2.2,"hello UDTF."
-- 使用 MY_UDF_SPLIT() 函数。
SELECT t1.a,t1.b, MY_UDF_SPLIT FROM t1, MY_UDF_SPLIT(t1.c1);
> output:
1,2.1,"hello"
1,2.1,"world"
2,2.2,"hello"
2,2.2,"UDTF."
第一个
MY_UDF_SPLIT
为调用MY_UDF_SPLIT
后生成的列别名。暂不支持使用
AS t2(f1)
的方式指定表格函数返回表的表别名和列别名。
查看UDF信息
运行以下命令查看UDF信息。
SHOW [GLOBAL] FUNCTIONS;
删除UDF
运行以下命令删除指定的UDF。
DROP [GLOBAL] FUNCTION <function_name>(arg_type [, ...]);
FAQ
Q:开发UDF时是否可以使用静态变量?不同UDF间的静态变量间否会互相影响?
A:支持在开发UDF时使用静态变量,且不同UDF间(即使类同名),静态变量是互相隔离的,不会互相影响。